作者:陈广
日期:2018-3-26
上篇文章,我们讲解了使用最原始 Thread 进行编程,但 Thread 实际上现在已经很少使用了。用得比较多的是 Begin/End,即使现在查微软的网络编程文档,介绍的还是这种编程模型。它并不是最新的网络编程模型,而且使用和理解起来还比较困难、麻烦。但想到现在大多数项目用的都是这个模型,学习还是很有必要的。另外此处不再是授课内容,将使用最新 C# 语法。
之前我们在 Thread 编程中需要自己手动创建线程做一系列操作,Begin/End 编程模型则不再需要你去创建线程,它会在内部使用线程完成一系列操作,最重要的是它使用的是线程池。在 Thread 编程模型中,我们使用Socket.Accept()
方法接收连接,然后创建线程去处理这些连接。如果在短时间内有成百上千的连接,对这些连接一一创建线程显然会耗费大量服务器资源,而使用线程池当然是一个很好的选择。在异步模式下,服务器可以使用BeginAccept
方法和EndAccept
方法来接受客户端连接的任务,在客户端则通过BeginConnect
方法和EndConnect
方法来实现向服务器的连接请求。
先来个最简单的,服务器只接受一个客户端连接。
在 Visual Studio 中新建一个控制台应用程序,使用如下命名空间:
using System;
using System.Net;
using System.Net.Sockets;
使用如下代码:
static void Main(string[] args)
{
IPAddress ip = IPAddress.Parse("127.0.0.1");
IPEndPoint point = new IPEndPoint(ip, 5000);
Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
try
{
listener.Bind(point);
listener.Listen(5);
listener.BeginAccept(new AsyncCallback(AcceptCallback), listener);//开始侦听连接
Console.WriteLine("服务器开始侦听...");
}
catch(Exception e)
{
Console.WriteLine(e.Message);
}
Console.ReadLine();
}
//成功收到连接请求后调用的回调函数
static void AcceptCallback(IAsyncResult ar)
{
Socket listener = (Socket)ar.AsyncState; //将参数ar还原为Socket
Socket handler = listener.EndAccept(ar); //EndAccept对应BeginAccept,它会阻塞线程,直到收到连接后解除阻塞,并结束一个侦听周期,
Console.WriteLine($"侦听到来自{handler.RemoteEndPoint.ToString()}的连接请求");
}
listener.BeginAccept()
方法用于开始侦听客户端连接,它会去线程池开一个线程用于侦听服务,然后立即返回,继续执行下面的代码打印出服务器开始侦听...
,这点在之后运行程序的结果中可验证。当有客户端发送连接请求后,则会自动调用AcceptCallback
回调函数。
BeginAccept
方法原型为:
public IAsyncResult BeginAccept(AsyncCallback callback, object state);
public delegate void AsyncCallback(IAsyncResult ar);
参数ar
是一个IAsyncResult
接口实例,用于表示线程状态callback
参数ar
的AsyncState
属性,在此处应传递侦听 Socket 的实例,以便使用此 Socket 进行后续操作。BeginAccept
和EndAccept
需要成对使用,本例在主线程中开始侦听时使用BeginAccept
;在接收到连接后调用的回调函数中使用EndAccept
阻塞工作线程,并在收到一个连接后解除阻塞,从而结束一个侦听周期。
从代码可知,这种编程机制已经非常古老,使用的还是最原始的委托。现在可以使用lambda表达式将回调函数直接写在BeginAccept()
方法中:
listener.BeginAccept(new AsyncCallback((ar)=> {
Socket s = (Socket)ar.AsyncState;
Socket handler = s.EndAccept(ar);
Console.WriteLine($"侦听到来自{handler.RemoteEndPoint.ToString()}的连接请求");
}), listener);
甚至可以使用闭包机制,直接调用主线程 Socket,不再需要参数传递:
listener.BeginAccept(new AsyncCallback((ar)=> {
Socket handler = listener.EndAccept(ar);
Console.WriteLine($"侦听到来自{handler.RemoteEndPoint.ToString()}的连接请求");
}), null);
以上两种方式测试可以使用,但实际使用时闭包会不会出问题,不得而知。之后的代码还是老老实实按照微软文档的方式,使用委托。
新建一个控制台应用程序,使用如下命名空间:
using System;
using System.Net;
using System.Net.Sockets;
使用如下代码:
static void Main(string[] args)
{
IPAddress ip = IPAddress.Parse("127.0.0.1");
try
{
Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPEndPoint point = new IPEndPoint(ip, 5000);
s.BeginConnect(point, new AsyncCallback(ConnectCallback), s); //向服务器发起连接
Console.WriteLine($"开始连接服务器 {ip.ToString()} ...");
}
catch(Exception e)
{
Console.WriteLine(e.Message);
}
Console.ReadLine();
}
//连接成功后的回调函数
static void ConnectCallback(IAsyncResult ar)
{
try
{
Socket s = (Socket)ar.AsyncState;
s.EndConnect(ar);//连接结束
Console.WriteLine($"成功连接服务器 {s.RemoteEndPoint.ToString()} ");
}
catch(Exception e)
{
Console.WriteLine(e.Message);
}
}
理解了BeginAccept
和EndAccept
,也就理解了BeginConnect
和EndConnect
。两者使用上没什么分别,都是成对出现,一个开始,一个结束,甚至使用的委托都完全相同。
运行效果如下图所示:
从运行结果可知,开第一个客户端,服务器收到连接,但开第二个客户端,服务器就没有收到连接了。这是因为一个BeginConnect
只能接收一个连接,而程序只执行了一次BeginConnect
。要想重复接收,只能象之前处理的一样,使用while(true)
不断循环执行BeginConnect
。
由于BeginConnect
并不阻塞程序,直接套while(true)
肯定是行不通的,瞬间开几万条线程是分分钟的事。所以只能手动阻塞,微软示例中使用的是ManualResetEvent
。这个类我们在多线程相关文章中并没有讲到,但它的使用方法和ManualResetEventSlim
一样,请参考《线程同步》这篇文章。
ManualResetEvent
的机制是发一个信号可以允许多条等待线程通过:
更改服务器代码如下:
public static ManualResetEvent allDone = new ManualResetEvent(false);//线程信号
static void Main(string[] args)
{
IPAddress ip = IPAddress.Parse("127.0.0.1");
IPEndPoint point = new IPEndPoint(ip, 5000);
Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
try
{
listener.Bind(point);
listener.Listen(5);
Console.WriteLine("服务器开始侦听...");
while (true)
{
allDone.Reset();//关门
listener.BeginAccept(new AsyncCallback(AcceptCallback), listener);//开始侦听连接
allDone.WaitOne();//阻塞主线程
}
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
Console.ReadLine();
}
//成功收到连接请求后调用的回调函数
static void AcceptCallback(IAsyncResult ar)
{
allDone.Set();//开门
Socket listener = (Socket)ar.AsyncState;
Socket handler = listener.EndAccept(ar);
Console.WriteLine($"侦听到来自{handler.RemoteEndPoint.ToString()}的连接请求");
}
运行程序并直接使用上例中的客户端程序,多开几个,运行效果如下:
现在可以接收多个客户端的连接了。
画张图演示程序运行过程
虽然 ManualResetEvent 一次允许多条线程通过,但这里只有主线程调用,所以每次只会有主线程一条线程通过。所以执行过程如图所示:BeginAccept
执行时会到线程池取线程进行侦听,同时到门口等着门打开,当客户端有连接请求时,在处理连接的同时会把门打开,使得主线程通过大门进入到下一个 Accept 周期,同时大门关闭。
从这段代码可知,虽然BeginAccept
另外开了一条线程进行监听,但主线程还是会被allDone.WaitOne()
阻塞住的,所以在实际应用中,还得要专门开一个线程来处理侦听连接。这就比之前用用线程处理同步的Accept
多一个线程了,使用起来还是相对麻烦的。当然,这里用到线程池以及专门的机制,对I/O密集型操作当然更好,出错的几率更低。
数据的接收与 Accept 和 Connect 机制类似,也是使用BeginReceive
和EndReceive
配对使用。
首先来看看BeginReceive
方法原型:
public IAsyncResult BeginReceive(byte[] buffer, int offset, int size, SocketFlags socketFlags, AsyncCallback callback, object state);
其中:
EndReceive
这些参数后两个本文之前已经提到过。而前面的四个参数除了 offset 相信大家在上篇文章中全都使用过,只是在这里需要作为参数传递而已。offset 一般情况下设置为0即可。
更改服务器代码如下:
public class StateObject //将BeginReceive需要的的参数包装在此类中进行传递
{
public Socket workSocket = null;
public const int BufferSize = 1024;//缓冲区大小
public byte[] buffer = new byte[BufferSize];//接收缓冲
}
public static ManualResetEvent allDone = new ManualResetEvent(false);//线程信号
static void Main(string[] args)
{
IPAddress ip = IPAddress.Parse("127.0.0.1");
IPEndPoint point = new IPEndPoint(ip, 5000);
Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
try
{
listener.Bind(point);
listener.Listen(5);
Console.WriteLine("服务器开始侦听...");
while (true)
{
allDone.Reset();//关门
listener.BeginAccept(new AsyncCallback(AcceptCallback), listener);//开始侦听连接
allDone.WaitOne();//阻塞主线程
}
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
Console.ReadLine();
}
//成功收到连接请求后调用的回调函数
static void AcceptCallback(IAsyncResult ar)
{
allDone.Set();//开门
Socket listener = (Socket)ar.AsyncState;
Socket handler = listener.EndAccept(ar);
Console.WriteLine($"侦听到来自{handler.RemoteEndPoint.ToString()}的连接请求");
StateObject state = new StateObject();
state.workSocket = handler;
handler.BeginReceive(state.buffer, 0, StateObject.BufferSize, SocketFlags.None,
new AsyncCallback(ReadCallback), state);
}
//收到 socket 发送的信息后触发的接收回调函数
static void ReadCallback(IAsyncResult ar)
{
try
{
StateObject state = (StateObject)ar.AsyncState;
Socket handler = state.workSocket;
int count = handler.EndReceive(ar);//这句会阻塞程序,直接接收到数据为止
string recvStr = Encoding.Unicode.GetString(state.buffer, 0, count);
Console.WriteLine($"收到{handler.RemoteEndPoint.ToString()}发来的信息:{recvStr}");
//进入到下一个等待接收周期
handler.BeginReceive(state.buffer, 0, StateObject.BufferSize, SocketFlags.None,
new AsyncCallback(ReadCallback), state);
}
catch(Exception e)
{
Console.WriteLine(e.Message);
}
}
本例使用了一个内部类StateObject
将BeginReceive
所需参数包装起来并方便在回调函数中进行传递。
另外,我们也注意到,跟上一篇文章中的同步接收进行对比,这里没有使用while(true)
循环接收数据,而是直接在回调函数内再一次调用BeginReceive
而进入到下一个接收周期。然后本线程结束,接力棒交到下一个ReadCallback
。有点递归的感觉。流程如下图所示:
数据的发送使用BeginSend
和EndSend
配对使用。
public IAsyncResult BeginSend(byte[] buffer, int offset, int size, SocketFlags socketFlags, AsyncCallback callback, object state);
参数和之前的BeginReceive
完全一样,这里就不再赘述了。
将客户端代码更改如下:
private static ManualResetEvent connectDone = new ManualResetEvent(false);//控制连接
private static ManualResetEvent sendDone = new ManualResetEvent(false);//控制发送
static void Main(string[] args)
{
IPAddress ip = IPAddress.Parse("127.0.0.1");
try
{
Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPEndPoint point = new IPEndPoint(ip, 5000);
s.BeginConnect(point, new AsyncCallback(ConnectCallback), s); //向服务器发起连接
Console.WriteLine($"开始连接服务器 {ip.ToString()} ...");
connectDone.WaitOne();//等待连接成功
Console.WriteLine("开始发送信息");
for (int i = 0; i < 10; i++)
{
byte[] sendBuff = Encoding.Unicode.GetBytes($"消息 {i}");
s.BeginSend(sendBuff, 0, sendBuff.Length, SocketFlags.None,
new AsyncCallback(SendCallback), s);
sendDone.WaitOne();//等待发送成功
Thread.Sleep(1000);//挂起1秒,稍后删掉
}
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
Console.ReadLine();
}
//连接成功后的回调函数
static void ConnectCallback(IAsyncResult ar)
{
try
{
Socket s = (Socket)ar.AsyncState;
s.EndConnect(ar);
Console.WriteLine($"成功连接服务器 {s.RemoteEndPoint.ToString()} ");
connectDone.Set();//开门,通知主线程连接完成
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
}
//发送回调函数
static void SendCallback(IAsyncResult ar)
{
try
{
Socket s = (Socket)ar.AsyncState;
int count = s.EndSend(ar);
sendDone.Set();//开门,通知主线程继续发送信息
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
}
由于BeginSend
是异步操作,为避免多条信息同时发送,使用sendDone
控制顺序发送,使得一条信息发送完毕后再发送下一条。运行程序,效果如下图所示:
另外,使用connectDone
等待连接成功,如果注释掉 Main 方法中的sendDone.WaitOne();
,将会看到先打印发送消息,然后再打印连接成功。
将上例代码中的客户端 Main 方法中的Thread.Sleep(1000);
这句代码删除,再次运行,结果如下图所示:
对比上例运行结果,我们发现发送的消息全连在一起了。这是因为发送速度太快,旧的数据还未接收,新的数据已经压入缓冲。这种现象叫粘包,前面讲的使用同步接收也会出现同样的问题,只是当时每发送一条消息后,都会停顿一段时间,所以没出现而已。要解决这类问题,只能手动划定边界了。
在客户端发消息时,在每条消息后面加一个’\n’作为结束标记,然后在服务器端解析消息时,以’\n’为界,将取出的字符串分段打印。
客户端发消息的for
循环改为:
for (int i = 0; i < 1000; i++)
{ //给每条消息后面加一个'\n'作为结束标记
byte[] sendBuff = Encoding.Unicode.GetBytes($"消息 {i}"+"\n");
s.BeginSend(sendBuff, 0, sendBuff.Length, SocketFlags.None,
new AsyncCallback(SendCallback), s);
sendDone.WaitOne();
}
服务器端的ReadCallback
方法代码改为:
static void ReadCallback(IAsyncResult ar)
{
try
{
StateObject state = (StateObject)ar.AsyncState;
Socket handler = state.workSocket;
int count = handler.EndReceive(ar);
string recvStr = Encoding.Unicode.GetString(state.buffer, 0, count);
int i = 0, pos = 0;
//以'\n'为界,取出每段字符
while (i < recvStr.Length)
{
if (recvStr[i] == '\n')
{
string s = recvStr.Substring(pos, i - pos);
Console.WriteLine(s);
pos = i + 1;
}
i++;
}
//进入到下一个等待接收周期
handler.BeginReceive(state.buffer, 0, StateObject.BufferSize, SocketFlags.None,
new AsyncCallback(ReadCallback), state);
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
}
运行程序,这里只贴出服务端结果:
结果看似没问题,但浏览全部收到的消息,可以发现,每隔一段就会出现字符丢失的情况。这是因为缓冲区满时,一条消息可能会被分成两段,第一段在这次发送,第二段在下次发送。使用当前算法使得第一段丢失。所以需要改进算法,将第一段保存下来,留到下次接收时接上。继续更改服务器代码:
更改StateObject
类代码如下:
public class StateObject
{
public Socket workSocket = null;
public const int BufferSize = 1024;//缓冲区大小
public byte[] buffer = new byte[BufferSize];//接收缓冲
public string remainStr = ""; //上次接收的消息解析后的剩余部分
}
更改ReadCallback
代码如下:
static void ReadCallback(IAsyncResult ar)
{
try
{
StateObject state = (StateObject)ar.AsyncState;
Socket handler = state.workSocket;
int count = handler.EndReceive(ar);
string recvStr = Encoding.Unicode.GetString(state.buffer, 0, count);
int i = 0, pos = 0;
//找到第一个结束标记,将第一段字符串取出跟上次剩余的字符串合并
while (recvStr[i] != '\n')
{
i++;
}
Console.WriteLine($"{state.remainStr}{recvStr.Substring(0, i)}");
pos = ++i;
//以'\n'为界,取出每段字符
while (i < recvStr.Length)
{
if (recvStr[i] == '\n')
{
string s = recvStr.Substring(pos, i - pos);
Console.WriteLine(s);
pos = i + 1;
}
i++;
}
//剩余的无'\n'结尾的字符存入state.remainStr供下次使用
state.remainStr = recvStr.Substring(pos, i - pos);
//进入到下一个等待接收周期
handler.BeginReceive(state.buffer, 0, StateObject.BufferSize, SocketFlags.None,
new AsyncCallback(ReadCallback), state);
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
}
现在再运行程序,就不会有字符丢失了。
Begin/End 就介绍到这吧,也不做例子了。因为这并不是最新的编程模型,了解即可,以后看别人的源码可能会用到。